From: Jeroen van der Heijden Date: Thu, 27 Sep 2018 13:07:17 +0000 (+0200) Subject: change buffer X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~8^2~60 X-Git-Url: https://dgit.raspbian.org/%22http://www.example.com/cgi/success//%22http:/www.example.com/cgi/success/?a=commitdiff_plain;h=09b811d269554848ab37942b6142ce740447402c;p=siridb-server.git change buffer --- diff --git a/include/siri/db/buffer.h b/include/siri/db/buffer.h index 50c30baa..c5e60a78 100644 --- a/include/siri/db/buffer.h +++ b/include/siri/db/buffer.h @@ -27,16 +27,13 @@ int siridb_buffer_open(siridb_t * siridb); int siridb_buffer_load(siridb_t * siridb); -int siridb_buffer_write_len( +int siridb_buffer_write_empty( siridb_t * siridb, siridb_series_t * series); - -int siridb_buffer_write_point( +int siridb_buffer_write_last_point( siridb_t * siridb, - siridb_series_t * series, - uint64_t * ts, - qp_via_t * val); + siridb_series_t * series); int siridb_buffer_fsync(siridb_t * siridb); diff --git a/itest/test_insert.py b/itest/test_insert.py index c4eedeba..7956aaa3 100644 --- a/itest/test_insert.py +++ b/itest/test_insert.py @@ -24,7 +24,8 @@ TIME_PRECISION = 'ns' class TestInsert(TestBase): title = 'Test inserts and response' - GEN_POINTS = functools.partial(gen_points, n=1, time_precision=TIME_PRECISION) + GEN_POINTS = functools.partial( + gen_points, n=1, time_precision=TIME_PRECISION) async def _test_series(self, client): @@ -34,20 +35,30 @@ class TestInsert(TestBase): result = await client.query('select * from "series int"') self.assertEqual(result['series int'], self.series_int) - result = await client.query('list series name, length, type, start, end') + result = await client.query( + 'list series name, length, type, start, end') result['series'].sort() self.assertEqual( result, { 'columns': ['name', 'length', 'type', 'start', 'end'], 'series': [ - ['series float', 10000, 'float', self.series_float[0][0], self.series_float[-1][0]], - ['series int', 10000, 'integer', self.series_int[0][0], self.series_int[-1][0]], + [ + 'series float', + 10000, 'float', + self.series_float[0][0], + self.series_float[-1][0]], + [ + 'series int', 10000, + 'integer', + self.series_int[0][0], + self.series_int[-1][0]], ] }) async def insert(self, client, series, n, timeout=1): for _ in range(n): - await client.insert_some_series(series, timeout=timeout, points=self.GEN_POINTS) + await client.insert_some_series( + series, timeout=timeout, points=self.GEN_POINTS) await asyncio.sleep(1.0) @default_test_setup(2, time_precision=TIME_PRECISION, compression=False) @@ -62,9 +73,11 @@ class TestInsert(TestBase): await self.client0.insert([]), {'success_msg': 'Successfully inserted 0 point(s).'}) - self.series_float = gen_points(tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m') + self.series_float = gen_points( + tp=float, n=10000, time_precision=TIME_PRECISION, ts_gap='5m') random.shuffle(self.series_float) - self.series_int = gen_points(tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m') + self.series_int = gen_points( + tp=int, n=10000, time_precision=TIME_PRECISION, ts_gap='5m') random.shuffle(self.series_int) self.assertEqual( @@ -96,6 +109,13 @@ class TestInsert(TestBase): with self.assertRaises(InsertError): await self.client0.insert({'no points': [[]]}) + self.assertEqual( + await self.client0.insert({ + 'ts_zero': [[0, 1]] + }), {'success_msg': 'Successfully inserted 1 point(s).'}) + + await self.client0.query('drop series "ts_zero"') + with self.assertRaises(InsertError): await self.client0.insert([{'name': 'no points', 'points': []}]) @@ -103,6 +123,11 @@ class TestInsert(TestBase): with self.assertRaises(InsertError): await self.client0.insert({'invalid ts': [[0.5, 6]]}) + # timestamps should be interger values + with self.assertRaises(InsertError): + await self.client0.insert( + {'invalid ts': [[-1, 6]]}) + # empty series name is not allowed with self.assertRaises(InsertError): await self.client0.insert({'': [[1, 0]]}) diff --git a/src/siri/db/buffer.c b/src/siri/db/buffer.c index 3306f39f..5aac01df 100644 --- a/src/siri/db/buffer.c +++ b/src/siri/db/buffer.c @@ -19,6 +19,7 @@ #include #include #include +#include #define SIRIDB_BUFFER_FN "buffer.dat" @@ -28,23 +29,25 @@ static int BUFFER_create_new(siridb_t * siridb, siridb_series_t * series); static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series); +static const uint64_t BUFFER_end = 0xffffffffffffffff; + /* * Returns 0 if success or EOF in case of an error. */ -int siridb_buffer_write_len( +int siridb_buffer_write_empty( siridb_t * siridb, siridb_series_t * series) { return ( /* go to the series position in buffer */ - fseeko( siridb->buffer_fp, - series->bf_offset + sizeof(uint32_t), + fseeko( siridb->buffer_fp, + series->bf_offset + 8, // 4 bytes are unused SEEK_SET) || - /* write new length */ - fwrite( &series->buffer->len, - sizeof(size_t), + /* write end ts */ + fwrite( &BUFFER_end, + sizeof(uint64_t), 1, siridb->buffer_fp) != 1) ? EOF : 0; } @@ -55,25 +58,30 @@ int siridb_buffer_write_len( * * Returns 0 if success or EOF in case of an error. */ -int siridb_buffer_write_point( +int siridb_buffer_write_last_point( siridb_t * siridb, - siridb_series_t * series, - uint64_t * ts, - qp_via_t * val) + siridb_series_t * series) { - const size_t sz = sizeof(uint64_t) + sizeof(qp_via_t); + siridb_point_t * point; + const size_t sz = sizeof(uint64_t) + sizeof(qp_via_t) + sizeof(uint64_t); char buf[sz]; + int last_idx = series->buffer->len - 1; + assert (last_idx >= 0); - memcpy(buf, ts, sizeof(uint64_t)); - memcpy(buf + sizeof(uint64_t), val, sizeof(qp_via_t)); + point = series->buffer->data + last_idx; - return ( - siridb_buffer_write_len(siridb, series) || + memcpy(buf, &point->ts, sizeof(uint64_t)); + memcpy(buf + sizeof(uint64_t), &point->val, sizeof(qp_via_t)); + memcpy( + buf + sizeof(uint64_t) + sizeof(qp_via_t), + &BUFFER_end, + sizeof(uint64_t)); + return ( /* jump to position where to write the new point */ - fseeko( siridb->buffer_fp, - 16 * (series->buffer->len - 1), - SEEK_CUR) || + fseeko( siridb->buffer_fp, + series->bf_offset + 8 + (16 * last_idx), + SEEK_SET) || /* write time-stamp and value */ fwrite(buf, sz, 1, siridb->buffer_fp) != 1) ? EOF : 0; diff --git a/src/siri/db/series.c b/src/siri/db/series.c index 226fbc02..2d8b3627 100644 --- a/src/siri/db/series.c +++ b/src/siri/db/series.c @@ -145,7 +145,7 @@ int siridb_series_add_point( else { series->buffer->len = 0; - if (siridb_buffer_write_len(siridb, series)) + if (siridb_buffer_write_empty(siridb, series)) { ERR_FILE rc = -1; @@ -154,7 +154,7 @@ int siridb_series_add_point( } else { - if (siridb_buffer_write_point(siridb, series, ts, val)) + if (siridb_buffer_write_last_point(siridb, series)) { ERR_FILE log_critical("Cannot write new point to buffer"); @@ -217,7 +217,7 @@ int siridb_series_add_pcache( } series->buffer->len = 0; - if (siridb_buffer_write_len(siridb, series)) + if (siridb_buffer_write_empty(siridb, series)) { ERR_FILE return -1;